/**
 * @file kafka_threadpool_integration_example.cpp
 * @brief Kafka模块线程池集成使用示例
 * @author AI Assistant
 * @date 2025/7/10
 * 
 * 本示例展示了如何使用集成了线程池的Kafka生产者和消费者，包括：
 * - 基础配置和初始化
 * - 异步操作使用
 * - 配置热更新
 * - 性能监控
 * - 优雅关闭
 */

#include <iostream>
#include <chrono>
#include <thread>
#include <vector>
#include <future>
#include <signal.h>

#include "common/kafka/kafka_producer.h"
#include "common/kafka/kafka_consumer.h"
#include "common/config/config_manager.h"
#include "common/logger/logger.h"

using namespace common::messaging;
using namespace common::config;

class KafkaThreadPoolDemo {
public:
    void run() {
        std::cout << "=== Kafka模块线程池集成示例 ===" << std::endl;
        
        try {
            // 1. 初始化配置管理器
            initializeConfig();
            
            // 2. 创建Kafka生产者和消费者
            createKafkaComponents();
            
            // 3. 演示基础功能
            demonstrateBasicOperations();
            
            // 4. 演示异步操作
            demonstrateAsyncOperations();
            
            // 5. 演示性能监控
            demonstrateMonitoring();
            
            // 6. 演示配置热更新
            demonstrateConfigHotReload();
            
            // 7. 优雅关闭
            gracefulShutdown();
            
        } catch (const std::exception& e) {
            std::cerr << "示例运行异常: " << e.what() << std::endl;
        }
    }

private:
    std::unique_ptr<KafkaProducer> producer_;
    std::unique_ptr<KafkaConsumer> consumer_;
    std::string test_topic_ = "demo-topic";
    
    void initializeConfig() {
        std::cout << "\n1. 初始化配置管理器..." << std::endl;
        
        auto& config = ConfigManager::getInstance();
        
        // 尝试加载配置文件
        bool config_loaded = false;
        try {
            config_loaded = config.loadFromFile("config/kafka_threadpool_config.yml");
            if (config_loaded) {
                std::cout << "   配置文件加载成功" << std::endl;
            }
        } catch (const std::exception& e) {
            std::cout << "   无法加载配置文件，使用默认配置: " << e.what() << std::endl;
        }
        
        // 如果配置文件加载失败，设置默认配置
        if (!config_loaded) {
            // Kafka生产者配置
            config.set("kafka.producer.brokers", "kafka:9092");
            config.set("kafka.producer.client_id", "demo-producer");
            config.set("kafka.producer.thread_pool.enable_async_operations", "true");
            config.set("kafka.producer.thread_pool.core_size", "4");
            config.set("kafka.producer.thread_pool.max_size", "8");
            
            // Kafka消费者配置
            config.set("kafka.consumer.brokers", "kafka:9092");
            config.set("kafka.consumer.group_id", "demo-group");
            config.set("kafka.consumer.client_id", "demo-consumer");
            config.set("kafka.consumer.thread_pool.enable_async_operations", "true");
            config.set("kafka.consumer.thread_pool.core_size", "4");
            config.set("kafka.consumer.thread_pool.max_size", "8");
            
            std::cout << "   默认配置设置完成" << std::endl;
        }
    }
    
    void createKafkaComponents() {
        std::cout << "\n2. 创建Kafka组件..." << std::endl;
        
        // 创建生产者
        producer_ = KafkaProducer::createFromConfig();
        if (producer_) {
            producer_->enableConfigHotReload();
            std::cout << "   生产者创建成功" << std::endl;
            std::cout << "   异步操作: " << (producer_->isAsyncOperationsEnabled() ? "启用" : "禁用") << std::endl;
        }
        
        // 创建消费者
        consumer_ = KafkaConsumer::createFromConfig();
        if (consumer_) {
            consumer_->enableConfigHotReload();
            consumer_->subscribe({test_topic_});
            std::cout << "   消费者创建成功" << std::endl;
            std::cout << "   异步操作: " << (consumer_->isAsyncOperationsEnabled() ? "启用" : "禁用") << std::endl;
        }
    }
    
    void demonstrateBasicOperations() {
        std::cout << "\n3. 演示基础操作..." << std::endl;
        
        if (!producer_) {
            std::cout << "   生产者未创建，跳过基础操作演示" << std::endl;
            return;
        }
        
        // 发送单条消息
        bool success = producer_->send(test_topic_, "demo-key", "Hello Kafka ThreadPool!");
        std::cout << "   单条消息发送: " << (success ? "成功" : "失败") << std::endl;
        
        // 刷新生产者
        producer_->flush(1000);
        std::cout << "   生产者刷新完成" << std::endl;
    }
    
    void demonstrateAsyncOperations() {
        std::cout << "\n4. 演示异步操作..." << std::endl;
        
        if (!producer_ || !producer_->isAsyncOperationsEnabled()) {
            std::cout << "   生产者异步操作未启用，跳过演示" << std::endl;
            return;
        }
        
        // 批量异步发送
        std::cout << "   批量异步发送消息..." << std::endl;
        std::vector<std::tuple<std::string, std::string, std::string>> messages;
        for (int i = 0; i < 5; ++i) {
            messages.emplace_back(test_topic_, "async-key-" + std::to_string(i), 
                                "Async message " + std::to_string(i));
        }
        
        auto start_time = std::chrono::steady_clock::now();
        auto future_results = producer_->sendBatchAsync(messages);
        
        // 异步刷新
        std::cout << "   执行异步刷新..." << std::endl;
        producer_->flushAsync(1000);
        
        // 异步轮询
        std::cout << "   执行异步轮询..." << std::endl;
        producer_->pollAsync(100);
        
        // 等待批量发送完成
        try {
            auto results = future_results.get();
            auto end_time = std::chrono::steady_clock::now();
            auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
            
            std::cout << "   批量发送完成，耗时: " << duration.count() << "ms" << std::endl;
            std::cout << "   发送结果: " << results.size() << " 条消息" << std::endl;
            
            int success_count = std::count(results.begin(), results.end(), true);
            std::cout << "   成功发送: " << success_count << "/" << results.size() << std::endl;
        } catch (const std::exception& e) {
            std::cout << "   批量发送异常: " << e.what() << std::endl;
        }
        
        // 演示消费者异步操作
        if (consumer_ && consumer_->isAsyncOperationsEnabled()) {
            std::cout << "   演示消费者异步操作..." << std::endl;
            
            // 异步处理消息
            consumer_->processMessageAsync(test_topic_, "test-key", "test-value", 12345);
            
            // 异步提交偏移量
            consumer_->commitAsyncEnhanced();
            
            // 异步健康检查
            consumer_->performAsyncHealthCheck();
            
            std::cout << "   消费者异步操作调用完成" << std::endl;
        }
        
        // 等待一段时间让异步操作完成
        std::this_thread::sleep_for(std::chrono::seconds(2));
    }
    
    void demonstrateMonitoring() {
        std::cout << "\n5. 演示性能监控..." << std::endl;
        
        // 显示生产者状态
        if (producer_) {
            std::cout << "   生产者线程池状态:" << std::endl;
            std::cout << producer_->getThreadPoolStatus() << std::endl;
        }
        
        // 显示消费者状态
        if (consumer_) {
            std::cout << "   消费者线程池状态:" << std::endl;
            std::cout << consumer_->getThreadPoolStatus() << std::endl;
        }
    }
    
    void demonstrateConfigHotReload() {
        std::cout << "\n6. 演示配置热更新..." << std::endl;
        
        auto& config = ConfigManager::getInstance();
        
        // 启用热更新（每2秒检查一次）
        config.enableHotReload("config/kafka_threadpool_config.yml", std::chrono::seconds(2));
        
        std::cout << "   配置热更新已启用" << std::endl;
        std::cout << "   您可以修改配置文件来测试热更新功能" << std::endl;
        std::cout << "   等待5秒以观察可能的配置变化..." << std::endl;
        
        // 等待一段时间以观察配置变化
        std::this_thread::sleep_for(std::chrono::seconds(5));
        
        config.disableHotReload();
        std::cout << "   配置热更新已禁用" << std::endl;
    }
    
    void gracefulShutdown() {
        std::cout << "\n7. 优雅关闭..." << std::endl;
        
        if (producer_) {
            std::cout << "   关闭生产者..." << std::endl;
            producer_->flush(2000);  // 最后刷新
            producer_.reset();
        }
        
        if (consumer_) {
            std::cout << "   关闭消费者..." << std::endl;
            consumer_->stop();
            consumer_.reset();
        }
        
        std::cout << "   所有Kafka组件已关闭" << std::endl;
    }
};

// 全局变量用于信号处理
std::unique_ptr<KafkaThreadPoolDemo> g_demo;

void signalHandler(int signal) {
    std::cout << "\n收到信号 " << signal << "，开始优雅关闭..." << std::endl;
    if (g_demo) {
        g_demo.reset();
    }
    exit(0);
}

int main() {
    // 设置信号处理
    signal(SIGINT, signalHandler);
    signal(SIGTERM, signalHandler);
    
    try {
        g_demo = std::make_unique<KafkaThreadPoolDemo>();
        g_demo->run();
        
        std::cout << "\n=== 示例运行完成 ===" << std::endl;
        return 0;
        
    } catch (const std::exception& e) {
        std::cerr << "程序异常: " << e.what() << std::endl;
        return 1;
    }
}
